feat(DpathExtractor): Add RecordExpander component for nested array extraction#859
Conversation
…thExtractor - Add optional expand_records_from_field parameter to extract items from nested arrays - Add optional remain_original_record parameter to preserve parent record context - Implement _expand_record method to handle array expansion logic - Add comprehensive unit tests covering all edge cases - Maintain backward compatibility with existing functionality Co-Authored-By: unknown <>
Original prompt from API User |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1764690419-dpath-extractor-expansion#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1764690419-dpath-extractor-expansionHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
- Create new RecordExpander class in airbyte_cdk/sources/declarative/expanders/ - Move expand_records_from_field and remain_original_record parameters from DpathExtractor to RecordExpander - Update DpathExtractor to accept optional record_expander attribute - Register RecordExpander in manifest component transformer - Update unit tests to use new RecordExpander class structure - All 24 tests passing, MyPy and Ruff checks passing This refactoring improves separation of concerns by isolating record expansion logic into a dedicated component. Co-Authored-By: unknown <>
- Add RecordExpander definition to declarative_component_schema.yaml - Add record_expander property to DpathExtractor schema - Update create_dpath_extractor in model_to_component_factory.py to handle record_expander - Auto-generate models from schema using poetry run poe build - All 24 tests passing This completes the schema registration for RecordExpander component, enabling YAML manifests to properly instantiate RecordExpander when used with DpathExtractor. Co-Authored-By: unknown <>
Apply cleaner logic using 'yield from' consistently: - When extracted is a list without record_expander, use 'yield from extracted' - Check 'if not self.record_expander' instead of nested if/else - Remove unnecessary 'yield from []' for empty case All 24 tests passing. Suggested by @DanyloGL. Co-Authored-By: unknown <>
Changes: - Add back 'else: yield from []' in DpathExtractor for explicit empty case - Update RecordExpander to return nothing when expand_records_from_field path doesn't exist or isn't a list - Update unit tests to expect no records instead of original record when expansion fails This makes RecordExpander stricter: it only emits records when successfully expanding a list. For Stripe invoice_line_items, this ensures we only emit line items, not invoice objects. All 24 tests passing. Requested by @DanyloGL. Co-Authored-By: unknown <>
Changes: 1. Remove TypeError from exception handler (only catch KeyError per dpath.get docs) 2. Add wildcard (*) support to RecordExpander for matching multiple arrays 3. Update docstring and schema to document wildcard support 4. Add 5 new unit tests for wildcard expansion scenarios 5. Regenerate models from updated schema When wildcards are used, RecordExpander: - Uses dpath.values() to find all matches - Filters for list-valued matches only - Expands items from all matched lists - Returns nothing if no list matches found All 29 tests passing. Requested by @DanyloGL. Co-Authored-By: unknown <>
MyPy was complaining that dpath.values() and dpath.get() return 'object' type. Added cast(Iterable[Any], ...) for dpath.values() and cast(Any, ...) for dpath.get() to satisfy MyPy type checking while maintaining runtime behavior. All 29 tests passing. MyPy check now passes. Co-Authored-By: unknown <>
Unified the wildcard and non-wildcard branches by collecting all arrays to process into a single list, then using one common loop for expansion. This eliminates the duplicated item iteration and record expansion logic. All 29 tests passing. MyPy check passes. Co-Authored-By: unknown <>
Changes per Daryna's feedback: 1. Removed isinstance(m, list) filter - now checking inside loop 2. Renamed 'matches' to 'extracted' 3. Removed type casts - using 'extracted: Any' instead 4. Renamed 'nested_array' to 'record' (loop var), using 'parent_record' for original 5. Removed 'if not nested_array:' check (redundant with for loop) All 29 tests passing. MyPy check passes. Co-Authored-By: unknown <>
- Add on_no_records parameter with 'skip' (default) and 'emit_parent' options - Add parent_fields_to_copy parameter to copy specific parent fields to child records - Add ParentFieldMapping class to define source/target field mappings - Update schema YAML with new properties and ParentFieldMapping definition - Regenerate models from schema - Add comprehensive unit tests for new features Co-Authored-By: unknown <>
There was a problem hiding this comment.
Pull request overview
This PR introduces a new RecordExpander component to the CDK that enables extracting items from nested array fields and emitting each item as a separate record. This addresses the Stripe connector's need to handle invoice_line_items and subscription_items streams where parent objects contain nested arrays that need to be flattened.
Changes:
- Added
RecordExpanderclass with support for wildcard paths, optional parent record embedding, configurable empty-array behavior, and selective parent field copying - Integrated
RecordExpanderwithDpathExtractorvia optionalrecord_expanderparameter - Updated schema definitions and component factory to support the new component
- Added 40 comprehensive unit tests covering expansion scenarios, wildcard paths, and edge cases
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
airbyte_cdk/sources/declarative/expanders/record_expander.py |
New core implementation of RecordExpander and ParentFieldMapping classes |
airbyte_cdk/sources/declarative/expanders/__init__.py |
Module initialization exporting new expander components |
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py |
Integration of RecordExpander into DpathExtractor with optional expansion logic |
airbyte_cdk/sources/declarative/models/declarative_component_schema.py |
Auto-generated Pydantic models for new components and parameters |
airbyte_cdk/sources/declarative/declarative_component_schema.yaml |
Schema definitions for RecordExpander and ParentFieldMapping |
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py |
Factory method updates to instantiate RecordExpander from models |
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py |
Component transformer mapping for DpathExtractor.record_expander |
unit_tests/sources/declarative/extractors/test_dpath_extractor.py |
40 new unit tests for expansion, on_no_records, parent_fields_to_copy, and combined features |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
/prerelease
|
|
❌ Cannot revive Devin session - the session is too old. Please start a new session instead. |
|
/autofix
|
|
/autofix
|
…ecordExpander Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
…cordTransformations
|
/autofix
|
|
Aside — Alfredo Garcia (@agarctfi), here are some findings from reviewing this PR: Issues worth addressing
Nits / suggestions
|
Patrick Nilan (pnilan)
left a comment
There was a problem hiding this comment.
See previous comment
|
Patrick Nilan (@pnilan) Thanks! Can you check the latest commit? |
|
/ai-review |
feat(cdk): Add RecordExpander component for nested array extraction
Summary
This PR adds a new
RecordExpandercomponent to the CDK that enables extracting items from nested array fields and emitting each item as a separate record. This is needed to fix the Stripeinvoice_line_itemsandsubscription_itemsstream issues where the events endpoint returns parent objects with nested arrays, but we need to emit each child item as a separate record.Key changes:
RecordExpanderclass inairbyte_cdk/sources/declarative/expanders/OnNoRecordsenum for type-safeon_no_recordsbehaviorDpathExtractorvia optionalrecord_expanderparameter["sections", "*", "items"])remain_original_recordflag to embed parent record contexton_no_recordsparameter:OnNoRecords.skip(default) orOnNoRecords.emit_parentbehaviorExample usage:
To copy specific parent fields into expanded child records, use existing
RecordTransformations(e.g.,AddFields) downstream of the extractor rather than configuring it onRecordExpanderdirectly.Updates since last revision
ParentFieldMappingandparent_fields_to_copy: Parent field copying is now delegated to existingRecordTransformations(e.g.,AddFields), keepingRecordExpanderfocused on expansion onlyOnNoRecordsenum:on_no_recordsis now a proper enum (OnNoRecords.skip,OnNoRecords.emit_parent) instead of a raw string, for type safetyReview & Testing Checklist for Human
This is a YELLOW risk PR (medium confidence). Please verify:
on_no_recordsbehavior: Verify theemit_parentoption correctly emits the parent record when expansion path is missing, empty, or non-array. The logic at the end ofexpand_record()inrecord_expander.pyhandles this.End-to-end testing: This PR only includes unit tests. The real-world behavior needs to be verified with the Stripe connector in the companion PR (fix(source-stripe): Fix invoice_line_items and subscription_items incremental streams (do not merge) airbyte#70294). In particular, confirm that parent field copying via
RecordTransformationsworks as expected now thatParentFieldMappinghas been removed.Recommended test plan:
poetry run pytest unit_tests/sources/declarative/extractors/test_dpath_extractor.py -vsubscription_itemsandinvoice_line_itemsstreamsremain_original_recordwithon_no_records: emit_parentNotes
Design
Design Document: RecordExpander Component
PR: #859
Related issues: oncall#8683 (invoice_line_items), oncall#10756 (subscription_items)
Companion PR: airbytehq/airbyte#70294 (Stripe connector)
1. Problem Statement
Several Stripe streams (
invoice_line_items,subscription_items) use an events-based incremental sync that fetches parent objects (e.g., an Invoice) from the events endpoint. These parent objects contain nested arrays of child items (e.g.,invoice.lines.data[]). The connector needs to emit each child item as a separate record, not the parent object.Before this PR, the
DpathExtractorcould only extract records at a single path depth. It had no mechanism to "explode" a nested array within an extracted record into multiple output records. Connector developers had to write custom Python code to handle this pattern, which is common across many APIs.2. Solution Overview
The PR introduces a new
RecordExpanderdeclarative component that plugs into the existingDpathExtractor. When configured, it takes each record extracted byDpathExtractorand expands it by pulling out items from a nested array field, emitting each item as a separate record.Parent field copying (e.g., copying
invoice_idfrom the parent into each child) is intentionally not part ofRecordExpander. Instead, use existingRecordTransformationssuch asAddFieldsdownstream of the extractor to enrich expanded records with parent context.Data flow with RecordExpander:
Data flow without RecordExpander (existing behavior, unchanged):
3. Component Architecture
3.1 New Classes
OnNoRecords(Enum)Location:
airbyte_cdk/sources/declarative/expanders/record_expander.pyDefines the behavior when record expansion produces no records:
skip— Emits nothing (default)emit_parent— Emits the original parent record unchangedRecordExpander(dataclass)Location:
airbyte_cdk/sources/declarative/expanders/record_expander.pyThe core component. Given a parent record, it navigates to a nested array field and yields each element as a separate record.
Attributes:
expand_records_from_fieldSequence[str | InterpolatedString]*).remain_original_recordboolFalse"original_record"key in each child.on_no_recordsOnNoRecordsOnNoRecords.skipskipemits nothing;emit_parentemits the parent record as-is.configConfigparametersInitVar[Mapping]Key method -
expand_record(record):expand_records_from_fieldpath (resolving interpolation).*), usesdpath.values()to match multiple nested arrays.dpath.get()to retrieve the single nested value.dict, creates a shallow copy and applies parent context (via_apply_parent_context).on_no_records == OnNoRecords.emit_parent, yields the original parent record.3.2 Modified Classes
DpathExtractorA new optional attribute
record_expander: Optional[RecordExpander] = Noneis added. Theextract_records()method is modified:record_expander: Behavior is identical to before (no change).record_expander: After extracting each record from the response body, each record is passed throughrecord_expander.expand_record(), and all expanded child records are yielded instead.ModelToComponentFactorycreate_dpath_extractor(): Now checks formodel.record_expanderand instantiates aRecordExpanderif present.create_record_expander()method: Converts the Pydantic model to aRecordExpanderinstance, mappingon_no_recordsstring values to theOnNoRecordsenum.declarative_component_schema.yamlNew schema definitions added:
RecordExpander: Definesexpand_records_from_field,remain_original_record, andon_no_records(enum:skip/emit_parent).DpathExtractorupdated: New optionalrecord_expanderproperty referencingRecordExpander.Auto-generated Pydantic models
declarative_component_schema.pyupdated with generatedRecordExpanderandOnNoRecordsmodel classes.manifest_component_transformer.pyUpdated to include propagation mapping for
DpathExtractor.record_expander, ensuring parameters are correctly propagated during manifest resolution.4. YAML Manifest Usage
Basic expansion
This extracts
response.data.objectas the parent record, then expandsparent.lines.data[]into individual records.With wildcard path and parent embedding
Matches
sections[0].items[],sections[1].items[], etc. Each child gets an"original_record"key containing the full parent. If no items are found, the parent is emitted unchanged.5. Concrete Example: Stripe invoice_line_items
API response from Stripe events endpoint:
{ "data": { "object": { "id": "in_123", "customer": "cus_456", "lines": { "data": [ {"id": "il_aaa", "amount": 1000, "description": "Widget"}, {"id": "il_bbb", "amount": 2000, "description": "Gadget"} ] } } } }Manifest configuration:
Output records:
{"id": "il_aaa", "amount": 1000, "description": "Widget"} {"id": "il_bbb", "amount": 2000, "description": "Gadget"}To enrich these with the parent
invoice_id, use a downstreamAddFieldstransformation rather than configuring it onRecordExpander.6. Edge Cases and Behavior Matrix
on_no_records=skipon_no_records=emit_parent[])expand_records_from_fieldis empty/None7. Design Decisions and Trade-offs
Why a separate
RecordExpanderclass instead of inline logic inDpathExtractor?Separation of concerns.
DpathExtractorhandles response-level extraction (navigating JSON to find records).RecordExpanderhandles record-level transformation (flattening nested arrays). This keeps each class focused and testable independently, and allowsRecordExpanderto potentially be reused with other extractor types in the future.Why
dpathfor nested access?The CDK already uses
dpathextensively for path-based JSON navigation. Reusing it maintains consistency and avoids introducing new dependencies. The wildcard (*) support comes for free fromdpath.values().Why
on_no_recordsinstead of always emitting parent?Different use cases need different behavior. For Stripe
invoice_line_items, if an invoice event has no line items, we want to skip it entirely (skip). For other APIs, the parent record itself might be the meaningful output when no children exist (emit_parent).Why no
parent_fields_to_copyonRecordExpander?Parent field copying was originally part of this component but was removed to keep
RecordExpanderfocused solely on expansion. Copying parent fields into child records is already supported by existingRecordTransformations(e.g.,AddFields), which is the idiomatic CDK pattern for record enrichment. This avoids duplicating transformation logic and keeps the component composable.Why an
OnNoRecordsenum instead of raw strings?Using an enum provides type safety, IDE autocompletion, and prevents invalid values at construction time rather than at runtime.
Shallow copy behavior
dict(item)creates a shallow copy when yielding expanded child records. This means nested mutable objects are shared between the parent and child records. This is acceptable because:8. Files Changed
airbyte_cdk/sources/declarative/expanders/record_expander.pyRecordExpanderclass andOnNoRecordsenumairbyte_cdk/sources/declarative/expanders/__init__.pyairbyte_cdk/sources/declarative/extractors/dpath_extractor.pyrecord_expanderparameter and expansion logicairbyte_cdk/sources/declarative/declarative_component_schema.yamlairbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.pyRecordExpanderwithOnNoRecordsenum mappingairbyte_cdk/sources/declarative/parsers/manifest_component_transformer.pyunit_tests/sources/declarative/extractors/test_dpath_extractor.py9. Testing
19 unit tests covering:
remain_original_recordflagon_no_recordswith bothOnNoRecords.skipandOnNoRecords.emit_parentDpathExtractor.extract_records()End-to-end validation with the Stripe connector is covered by the companion PR (airbyte/airbyte#70294).
Link to Devin session: https://app.devin.ai/sessions/08169cc37f9342acb410071ab8306f05
Requested by: Alfredo Garcia (@agarctfi)